package com.zoujiaqing.kafka.example.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zoujiaqing.kafka.example.domain.Order;
import com.zoujiaqing.kafka.example.domain.OrderMessage;
import com.zoujiaqing.kafka.example.domain.OrderStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    public void createOrder(Order order) throws JsonProcessingException {
        // 1. 把订单信息发送到 “order” 主题中
        kafkaTemplate.send("order", objectMapper.writeValueAsString(order));
        // 2. 更新订单状态为“待支付”
        OrderMessage message = new OrderMessage();
        message.setOrder(order);
        message.setStatus(OrderStatus.CREATED);
        // 3. 将订单状态更新消息发送到 “order-status-update” 主题中
        kafkaTemplate.send("order-status-update", objectMapper.writeValueAsString(message));

        log.info("订单已创建 #{}", order.getId());
    }

    @KafkaListener(topics = "order-status-update")
    public void handleOrderStatusUpdate(String message, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        // 1. 获取订单状态更新消息
        OrderMessage orderMessage;
        try {
            orderMessage = objectMapper.readValue(message, OrderMessage.class);
        } catch (Exception e) {
            throw new RuntimeException("Failed to convert message to OrderMessage: " + message, e);
        }
        log.debug("订单状态更新 #{}", orderMessage.getOrder().getId());
        // 2. 更新订单状态
        OrderStatus status = orderMessage.getStatus();
        Order order = orderMessage.getOrder();
        // 3. 将订单信息更新到数据库中
        updateOrderStatus(order, status);

        // 4. 消费消息
        try {
            ack.acknowledge();
            log.info("消息消费成功 topic:{}, msg:{}", topic, message);
        } catch (Exception e) {
            log.error("消费消息失败 topic:{}, msg:{}", topic, message, e);
        }

        // 5. 如果订单状态为“已支付”，则执行后续操作
        if (status == OrderStatus.PAID) {
            // 5.1 生成物流信息
            generateLogisticsInfo(order);
            // 5.2 发送支付成功通知
            sendPaymentSuccessNotification(order);
        }
    }

    private void updateOrderStatus(Order order, OrderStatus status) {
        log.info("更新订单 #{} 状态到数据库", order.getId());
    }

    private void generateLogisticsInfo(Order order) {
        log.info("生成物流信息");
    }

    private void sendPaymentSuccessNotification(Order order) {
        log.info("发送支付成功通知，可以通过发短信，邮箱通知");
    }
}
